package file

import (
	"context"
	"errors"
	"fmt"
	"gitee.com/xiao_hange/go-admin-file/file/repository"
	"gitee.com/xiao_hange/go-admin-pkg/pkg/logger"
	"gitee.com/xiao_hange/go-admin-pkg/pkg/saramax"
	"github.com/IBM/sarama"
	"sync"
	"time"
)

type InteractiveReadEventFileConsumer struct {
	client      sarama.Client
	l           logger.AppLogger
	repo        repository.FileRepository
	mutex       sync.RWMutex
	groupIDName string
}

func NewInteractiveReadEventFileConsumer(client sarama.Client, l logger.AppLogger, repo repository.FileRepository) *InteractiveReadEventFileConsumer {
	return &InteractiveReadEventFileConsumer{
		client:      client,
		l:           l,
		repo:        repo,
		groupIDName: "Send_File_For_Mail",
	}
}

func (k *InteractiveReadEventFileConsumer) Start() error {
	cg, err := sarama.NewConsumerGroupFromClient(k.groupIDName, k.client)

	if err != nil {
		return err
	}

	go func() {
		err = cg.Consume(context.Background(), []string{"file_export"}, saramax.NewHandler[ReadEvent](k.l, k.groupIDName, k.Consumer))
		if err != nil {
			if errors.Is(saramax.KafkaServerErr, err) {
				for i := 1; i < 10; i++ {
					duration := 20 * time.Second
					timeG, _ := k.reconnectConsumerGroup(cg, i)
					k.l.Warn(fmt.Sprintf("重启第 %d 次失败", i))
					if duration.Milliseconds() < timeG {
						i = 1
					}
					time.Sleep(time.Second * 5)
				}
			} else {
				k.l.Error("退出了消费循环异常", logger.Error(err))
			}
		}
	}()
	return nil
}

// Consumer 具体业务消费 对应在 repo 中
func (k *InteractiveReadEventFileConsumer) Consumer(msg *sarama.ConsumerMessage, t ReadEvent) error {
	//_, cancel := context.WithTimeout(context.Background(), time.Second)
	//defer cancel()
	k.l.Info("KafKa-send-mail-for-csv", logger.String("val", "开始消费"), logger.String("邮件", t.Mail))
	err := k.repo.SendMailForCsv(context.Background(), t.Mail)
	if err != nil {
		k.l.Error("KafKa-send-mail-for-csv", logger.Error(err))
	}
	k.l.Info("KafKa-send-mail-for-csv", logger.String("val", "发送成功"))
	return nil
}

func (k *InteractiveReadEventFileConsumer) reconnectConsumerGroup(cg sarama.ConsumerGroup, i int) (int64, error) {
	k.mutex.Lock()
	defer k.mutex.Unlock()
	startTime := time.Now()
	k.l.Warn(fmt.Sprintf("进行第 %d 重新连接", i))
	newCg, err := sarama.NewConsumerGroupFromClient(k.groupIDName, k.client)
	if err != nil {
		return 0, err
	}
	_ = cg.Close()

	// 启动新的消费者组循环
	err = newCg.Consume(context.Background(), []string{"file_export"}, saramax.NewHandler[ReadEvent](k.l, k.groupIDName, k.Consumer))
	if err != nil {
		return time.Since(startTime).Milliseconds(), err
	}
	return 0, nil
}
